{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2019 The Kubeflow Authors. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#      http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "# =============================================================================="
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Composing a pipeline from reusable, pre-built, and lightweight components\n",
    "\n",
    "This tutorial describes how to build a Kubeflow pipeline from reusable, pre-built, and lightweight components. The following provides a summary of the steps involved in creating and using a reusable component:\n",
    "\n",
    "- Write the program that contains your component’s logic. The program must use files and command-line arguments to pass data to and from the component.\n",
    "- Containerize the program.\n",
    "- Write a component specification in YAML format that describes the component for the Kubeflow Pipelines system.\n",
    "- Use the Kubeflow Pipelines SDK to load your component, use it in a pipeline and run that pipeline.\n",
    "\n",
    "Then, we will compose a pipeline from a reusable component, a pre-built component, and a lightweight component. The pipeline will perform the following steps:\n",
    "- Train an MNIST model and export it to Google Cloud Storage.\n",
    "- Deploy the exported TensorFlow model on AI Platform Prediction service.\n",
    "- Test the deployment by calling the endpoint with test data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note: Ensure that you have Docker installed, if you want to build the image locally, by running the following command:\n",
    " \n",
    "`which docker`\n",
    " \n",
    "The result should be something like:\n",
    "\n",
    "`/usr/bin/docker`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "import kfp.gcp as gcp\n",
    "import kfp.dsl as dsl\n",
    "import kfp.compiler as compiler\n",
    "import kfp.components as comp\n",
    "import datetime\n",
    "\n",
    "import kubernetes as k8s"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameter"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID='<ADD GCP PROJECT HERE>'\n",
    "GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create client\n",
    "\n",
    "If you run this notebook **outside** of a Kubeflow cluster, run the following command:\n",
    "- `host`: The URL of your Kubeflow Pipelines instance, for example \"https://`<your-deployment>`.endpoints.`<your-project>`.cloud.goog/pipeline\"\n",
    "- `client_id`: The client ID used by Identity-Aware Proxy\n",
    "- `other_client_id`: The client ID used to obtain the auth codes and refresh tokens.\n",
    "- `other_client_secret`: The client secret used to obtain the auth codes and refresh tokens.\n",
    "\n",
    "```python\n",
    "client = kfp.Client(host, client_id, other_client_id, other_client_secret)\n",
    "```\n",
    "\n",
    "If you run this notebook **within** a Kubeflow cluster, run the following command:\n",
    "```python\n",
    "client = kfp.Client()\n",
    "```\n",
    "\n",
    "You'll need to create OAuth client ID credentials of type `Other` to get `other_client_id` and `other_client_secret`. Learn more about [creating OAuth credentials](\n",
    "https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional Parameters, but required for running outside Kubeflow cluster\n",
    "\n",
    "# The host for 'AI Platform Pipelines' ends with 'pipelines.googleusercontent.com'\n",
    "# The host for pipeline endpoint of 'full Kubeflow deployment' ends with '/pipeline'\n",
    "# Examples are:\n",
    "# https://7c021d0340d296aa-dot-us-central2.pipelines.googleusercontent.com\n",
    "# https://kubeflow.endpoints.kubeflow-pipeline.cloud.goog/pipeline\n",
    "HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'\n",
    "\n",
    "# For 'full Kubeflow deployment' on GCP, the endpoint is usually protected through IAP, therefore the following \n",
    "# will be needed to access the endpoint.\n",
    "CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'\n",
    "OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'\n",
    "OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This is to ensure the proper access token is present to reach the end point for 'AI Platform Pipelines'\n",
    "# If you are not working with 'AI Platform Pipelines', this step is not necessary\n",
    "! gcloud auth print-access-token"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create kfp client\n",
    "in_cluster = True\n",
    "try:\n",
    "  k8s.config.load_incluster_config()\n",
    "except:\n",
    "  in_cluster = False\n",
    "  pass\n",
    "\n",
    "if in_cluster:\n",
    "    client = kfp.Client()\n",
    "else:\n",
    "    if HOST.endswith('googleusercontent.com'):\n",
    "        CLIENT_ID = None\n",
    "        OTHER_CLIENT_ID = None\n",
    "        OTHER_CLIENT_SECRET = None\n",
    "\n",
    "    client = kfp.Client(host=HOST, \n",
    "                        client_id=CLIENT_ID,\n",
    "                        other_client_id=OTHER_CLIENT_ID, \n",
    "                        other_client_secret=OTHER_CLIENT_SECRET)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build reusable components"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Writing the program code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following cell creates a file `app.py` that contains a Python script. The script downloads MNIST dataset, trains a Neural Network based classification model, writes the training log and exports the trained model to Google Cloud Storage.\n",
    "\n",
    "Your component can create outputs that the downstream components can use as inputs. Each output must be a string and the container image must write each output to a separate local text file. For example, if a training component needs to output the path of the trained model, the component writes the path into a local file, such as `/output.txt`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# Create folders if they don't exist.\n",
    "mkdir -p tmp/reuse_components_pipeline/mnist_training\n",
    "\n",
    "# Create the Python file that lists GCS blobs.\n",
    "cat > ./tmp/reuse_components_pipeline/mnist_training/app.py <<HERE\n",
    "import argparse\n",
    "from datetime import datetime\n",
    "import tensorflow as tf\n",
    "\n",
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument(\n",
    "    '--model_path', type=str, required=True, help='Name of the model file.')\n",
    "parser.add_argument(\n",
    "    '--bucket', type=str, required=True, help='GCS bucket name.')\n",
    "args = parser.parse_args()\n",
    "\n",
    "bucket=args.bucket\n",
    "model_path=args.model_path\n",
    "\n",
    "model = tf.keras.models.Sequential([\n",
    "  tf.keras.layers.Flatten(input_shape=(28, 28)),\n",
    "  tf.keras.layers.Dense(512, activation=tf.nn.relu),\n",
    "  tf.keras.layers.Dropout(0.2),\n",
    "  tf.keras.layers.Dense(10, activation=tf.nn.softmax)\n",
    "])\n",
    "\n",
    "model.compile(optimizer='adam',\n",
    "              loss='sparse_categorical_crossentropy',\n",
    "              metrics=['accuracy'])\n",
    "\n",
    "print(model.summary())    \n",
    "\n",
    "mnist = tf.keras.datasets.mnist\n",
    "(x_train, y_train),(x_test, y_test) = mnist.load_data()\n",
    "x_train, x_test = x_train / 255.0, x_test / 255.0\n",
    "\n",
    "callbacks = [\n",
    "  tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),\n",
    "  # Interrupt training if val_loss stops improving for over 2 epochs\n",
    "  tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),\n",
    "]\n",
    "\n",
    "model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,\n",
    "          validation_data=(x_test, y_test))\n",
    "\n",
    "from tensorflow import gfile\n",
    "\n",
    "gcs_path = bucket + \"/\" + model_path\n",
    "# The export require the folder is new\n",
    "if gfile.Exists(gcs_path):\n",
    "    gfile.DeleteRecursively(gcs_path)\n",
    "tf.keras.experimental.export_saved_model(model, gcs_path)\n",
    "\n",
    "with open('/output.txt', 'w') as f:\n",
    "  f.write(gcs_path)\n",
    "HERE"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Docker container\n",
    "Create your own container image that includes your program. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating a Dockerfile"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now create a container that runs the script. Start by creating a Dockerfile. A Dockerfile contains the instructions to assemble a Docker image. The `FROM` statement specifies the Base Image from which you are building. `WORKDIR` sets the working directory. When you assemble the Docker image, `COPY` copies the required files and directories (for example, `app.py`) to the file system of the container. `RUN` executes a command (for example, install the dependencies) and commits the results. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# Create Dockerfile.\n",
    "# AI platform only support tensorflow 1.14\n",
    "cat > ./tmp/reuse_components_pipeline/mnist_training/Dockerfile <<EOF\n",
    "FROM tensorflow/tensorflow:1.14.0-py3\n",
    "WORKDIR /app\n",
    "COPY . /app\n",
    "EOF"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build docker image"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have created our Dockerfile for creating our Docker image. Then we need to build the image and push to a registry to host the image. There are three possible options:\n",
    "- Use the `kfp.containers.build_image_from_working_dir` to build the image and push to the Container Registry (GCR). This requires [kaniko](https://cloud.google.com/blog/products/gcp/introducing-kaniko-build-container-images-in-kubernetes-and-google-container-builder-even-without-root-access), which will be auto-installed with 'full Kubeflow deployment' but not 'AI Platform Pipelines'.\n",
    "- Use [Cloud Build](https://cloud.google.com/cloud-build), which would require the setup of GCP project and enablement of corresponding API. If you are working with GCP 'AI Platform Pipelines' with GCP project running, it is recommended to use Cloud Build.\n",
    "- Use [Docker](https://www.docker.com/get-started) installed locally and push to e.g. GCR."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note**:\n",
    "If you run this notebook **within Kubeflow cluster**, **with Kubeflow version >= 0.7** and exploring **kaniko option**, you need to ensure that valid credentials are created within your notebook's namespace.\n",
    "- With Kubeflow version >= 0.7, the credential is supposed to be copied automatically while creating notebook through `Configurations`, which doesn't work properly at the time of creating this notebook. \n",
    "- You can also add credentials to the new namespace by either [copying credentials from an existing Kubeflow namespace, or by creating a new service account](https://www.kubeflow.org/docs/gke/authentication/#kubeflow-v0-6-and-before-gcp-service-account-key-as-secret).\n",
    "- The following cell demonstrates how to copy the default secret to your own namespace.\n",
    "\n",
    "```bash\n",
    "%%bash\n",
    "\n",
    "NAMESPACE=<your notebook name space>\n",
    "SOURCE=kubeflow\n",
    "NAME=user-gcp-sa\n",
    "SECRET=$(kubectl get secrets \\${NAME} -n \\${SOURCE} -o jsonpath=\"{.data.\\${NAME}\\.json}\" | base64 -D)\n",
    "kubectl create -n \\${NAMESPACE} secret generic \\${NAME} --from-literal=\"\\${NAME}.json=\\${SECRET}\"\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "IMAGE_NAME=\"mnist_training_kf_pipeline\"\n",
    "TAG=\"latest\" # \"v_$(date +%Y%m%d_%H%M%S)\"\n",
    "\n",
    "GCR_IMAGE=\"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}\".format(\n",
    "    PROJECT_ID=PROJECT_ID,\n",
    "    IMAGE_NAME=IMAGE_NAME,\n",
    "    TAG=TAG\n",
    ")\n",
    "\n",
    "APP_FOLDER='./tmp/reuse_components_pipeline/mnist_training/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# In the following, for the purpose of demonstration\n",
    "# Cloud Build is choosen for 'AI Platform Pipelines'\n",
    "# kaniko is choosen for 'full Kubeflow deployment'\n",
    "\n",
    "if HOST.endswith('googleusercontent.com'):\n",
    "    # kaniko is not pre-installed with 'AI Platform Pipelines'\n",
    "    import subprocess\n",
    "    # ! gcloud builds submit --tag ${IMAGE_NAME} ${APP_FOLDER}\n",
    "    cmd = ['gcloud', 'builds', 'submit', '--tag', GCR_IMAGE, APP_FOLDER]\n",
    "    build_log = (subprocess.run(cmd, stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))\n",
    "    print(build_log)\n",
    "    \n",
    "else:\n",
    "    if kfp.__version__ <= '0.1.36':\n",
    "        # kfp with version 0.1.36+ introduce broken change that will make the following code not working\n",
    "        import subprocess\n",
    "        \n",
    "        builder = kfp.containers._container_builder.ContainerBuilder(\n",
    "            gcs_staging=GCS_BUCKET + \"/kfp_container_build_staging\"\n",
    "        )\n",
    "\n",
    "        kfp.containers.build_image_from_working_dir(\n",
    "            image_name=GCR_IMAGE,\n",
    "            working_dir=APP_FOLDER,\n",
    "            builder=builder\n",
    "        )\n",
    "    else:\n",
    "        raise(\"Please build the docker image use either [Docker] or [Cloud Build]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### If you want to use docker to build the image\n",
    "Run the following in a cell\n",
    "```bash\n",
    "%%bash -s \"{PROJECT_ID}\"\n",
    "\n",
    "IMAGE_NAME=\"mnist_training_kf_pipeline\"\n",
    "TAG=\"latest\" # \"v_$(date +%Y%m%d_%H%M%S)\"\n",
    "\n",
    "# Create script to build docker image and push it.\n",
    "cat > ./tmp/components/mnist_training/build_image.sh <<HERE\n",
    "PROJECT_ID=\"${1}\"\n",
    "IMAGE_NAME=\"${IMAGE_NAME}\"\n",
    "TAG=\"${TAG}\"\n",
    "GCR_IMAGE=\"gcr.io/\\${PROJECT_ID}/\\${IMAGE_NAME}:\\${TAG}\"\n",
    "docker build -t \\${IMAGE_NAME} .\n",
    "docker tag \\${IMAGE_NAME} \\${GCR_IMAGE}\n",
    "docker push \\${GCR_IMAGE}\n",
    "docker image rm \\${IMAGE_NAME}\n",
    "docker image rm \\${GCR_IMAGE}\n",
    "HERE\n",
    "\n",
    "cd tmp/components/mnist_training\n",
    "bash build_image.sh\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "image_name = GCR_IMAGE"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Writing your component definition file\n",
    "To create a component from your containerized program, you must write a component specification in YAML that describes the component for the Kubeflow Pipelines system.\n",
    "\n",
    "For the complete definition of a Kubeflow Pipelines component, see the [component specification](https://www.kubeflow.org/docs/pipelines/reference/component-spec/). However, for this tutorial you don’t need to know the full schema of the component specification. The notebook provides enough information to complete the tutorial.\n",
    "\n",
    "Start writing the component definition (component.yaml) by specifying your container image in the component’s implementation section:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash -s \"{image_name}\"\n",
    "\n",
    "GCR_IMAGE=\"${1}\"\n",
    "echo ${GCR_IMAGE}\n",
    "\n",
    "# Create Yaml\n",
    "# the image uri should be changed according to the above docker image push output\n",
    "\n",
    "cat > mnist_pipeline_component.yaml <<HERE\n",
    "name: Mnist training\n",
    "description: Train a mnist model and save to GCS\n",
    "inputs:\n",
    "  - name: model_path\n",
    "    description: 'Path of the tf model.'\n",
    "    type: String\n",
    "  - name: bucket\n",
    "    description: 'GCS bucket name.'\n",
    "    type: String\n",
    "outputs:\n",
    "  - name: gcs_model_path\n",
    "    description: 'Trained model path.'\n",
    "    type: GCSPath\n",
    "implementation:\n",
    "  container:\n",
    "    image: ${GCR_IMAGE}\n",
    "    command: [\n",
    "      python, /app/app.py,\n",
    "      --model_path, {inputValue: model_path},\n",
    "      --bucket,     {inputValue: bucket},\n",
    "    ]\n",
    "    fileOutputs:\n",
    "      gcs_model_path: /output.txt\n",
    "HERE"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "mnist_train_op = kfp.components.load_component_from_file(os.path.join('./', 'mnist_pipeline_component.yaml')) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mnist_train_op.component_spec"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Define deployment operation on AI Platform"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mlengine_deploy_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/ml_engine/deploy/component.yaml')\n",
    "\n",
    "def deploy(\n",
    "    project_id,\n",
    "    model_uri,\n",
    "    model_id,\n",
    "    runtime_version,\n",
    "    python_version):\n",
    "    \n",
    "    return mlengine_deploy_op(\n",
    "        model_uri=model_uri,\n",
    "        project_id=project_id, \n",
    "        model_id=model_id, \n",
    "        runtime_version=runtime_version, \n",
    "        python_version=python_version,\n",
    "        replace_existing_version=True, \n",
    "        set_default=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Kubeflow serving deployment component as an option. **Note that, the deployed Endppoint URI is not availabe as output of this component.**\n",
    "```python\n",
    "kubeflow_deploy_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/ml_engine/deploy/component.yaml')\n",
    "\n",
    "def deploy_kubeflow(\n",
    "    model_dir,\n",
    "    tf_server_name):\n",
    "    return kubeflow_deploy_op(\n",
    "        model_dir=model_dir,\n",
    "        server_name=tf_server_name,\n",
    "        cluster_name='kubeflow', \n",
    "        namespace='kubeflow',\n",
    "        pvc_name='', \n",
    "        service_type='ClusterIP')\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a lightweight component for testing the deployment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def deployment_test(project_id: str, model_name: str, version: str) -> str:\n",
    "\n",
    "    model_name = model_name.split(\"/\")[-1]\n",
    "    version = version.split(\"/\")[-1]\n",
    "    \n",
    "    import googleapiclient.discovery\n",
    "    \n",
    "    def predict(project, model, data, version=None):\n",
    "      \"\"\"Run predictions on a list of instances.\n",
    "\n",
    "      Args:\n",
    "        project: (str), project where the Cloud ML Engine Model is deployed.\n",
    "        model: (str), model name.\n",
    "        data: ([[any]]), list of input instances, where each input instance is a\n",
    "          list of attributes.\n",
    "        version: str, version of the model to target.\n",
    "\n",
    "      Returns:\n",
    "        Mapping[str: any]: dictionary of prediction results defined by the model.\n",
    "      \"\"\"\n",
    "\n",
    "      service = googleapiclient.discovery.build('ml', 'v1')\n",
    "      name = 'projects/{}/models/{}'.format(project, model)\n",
    "\n",
    "      if version is not None:\n",
    "        name += '/versions/{}'.format(version)\n",
    "\n",
    "      response = service.projects().predict(\n",
    "          name=name, body={\n",
    "              'instances': data\n",
    "          }).execute()\n",
    "\n",
    "      if 'error' in response:\n",
    "        raise RuntimeError(response['error'])\n",
    "\n",
    "      return response['predictions']\n",
    "\n",
    "    import tensorflow as tf\n",
    "    import json\n",
    "    \n",
    "    mnist = tf.keras.datasets.mnist\n",
    "    (x_train, y_train),(x_test, y_test) = mnist.load_data()\n",
    "    x_train, x_test = x_train / 255.0, x_test / 255.0\n",
    "\n",
    "    result = predict(\n",
    "        project=project_id,\n",
    "        model=model_name,\n",
    "        data=x_test[0:2].tolist(),\n",
    "        version=version)\n",
    "    print(result)\n",
    "    \n",
    "    return json.dumps(result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# # Test the function with already deployed version\n",
    "# deployment_test(\n",
    "#     project_id=PROJECT_ID,\n",
    "#     model_name=\"mnist\",\n",
    "#     version='ver_bb1ebd2a06ab7f321ad3db6b3b3d83e6' # previous deployed version for testing\n",
    "# )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "deployment_test_op = comp.func_to_container_op(\n",
    "    func=deployment_test, \n",
    "    base_image=\"tensorflow/tensorflow:1.15.0-py3\",\n",
    "    packages_to_install=[\"google-api-python-client==1.7.8\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create your workflow as a Python function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define your pipeline as a Python function. ` @kfp.dsl.pipeline` is a required decoration, and must include `name` and `description` properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the pipeline\n",
    "@dsl.pipeline(\n",
    "   name='Mnist pipeline',\n",
    "   description='A toy pipeline that performs mnist model training.'\n",
    ")\n",
    "def mnist_reuse_component_deploy_pipeline(\n",
    "    project_id: str = PROJECT_ID,\n",
    "    model_path: str = 'mnist_model', \n",
    "    bucket: str = GCS_BUCKET\n",
    "):\n",
    "    train_task = mnist_train_op(\n",
    "        model_path=model_path, \n",
    "        bucket=bucket\n",
    "    ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
    "    \n",
    "    deploy_task = deploy(\n",
    "        project_id=project_id,\n",
    "        model_uri=train_task.outputs['gcs_model_path'],\n",
    "        model_id=\"mnist\", \n",
    "        runtime_version=\"1.14\",\n",
    "        python_version=\"3.5\"\n",
    "    ).apply(gcp.use_gcp_secret('user-gcp-sa'))  \n",
    "    \n",
    "    deploy_test_task = deployment_test_op(\n",
    "        project_id=project_id,\n",
    "        model_name=deploy_task.outputs[\"model_name\"], \n",
    "        version=deploy_task.outputs[\"version_name\"],\n",
    "    ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
    "    \n",
    "    return True"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Submit a pipeline run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = mnist_reuse_component_deploy_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = 'minist_kubeflow'\n",
    "\n",
    "arguments = {\"model_path\":\"mnist_model\",\n",
    "             \"bucket\":GCS_BUCKET}\n",
    "\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "\n",
    "# Submit pipeline directly from pipeline function\n",
    "run_result = client.create_run_from_pipeline_func(pipeline_func, \n",
    "                                                  experiment_name=experiment_name, \n",
    "                                                  run_name=run_name, \n",
    "                                                  arguments=arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**As an alternative, you can compile the pipeline into a package.** The compiled pipeline can be easily shared and reused by others to run the pipeline.\n",
    "\n",
    "```python\n",
    "pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
    "\n",
    "experiment = client.create_experiment('python-functions-mnist')\n",
    "\n",
    "run_result = client.run_pipeline(\n",
    "    experiment_id=experiment.id, \n",
    "    job_name=run_name, \n",
    "    pipeline_package_path=pipeline_filename, \n",
    "    params=arguments)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "virtualPython35",
   "language": "python",
   "name": "virtualpython35"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
